package cn.doitedu.etl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2022/12/11
 * @Tips: 学大数据，到多易教育
 * @Desc:
 *   搜索词及结果准确率点击率分析基础表计算任务
 **/
public class E06_EtlJob_SearchEventsAnaJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 用kafka连接器，创建输入数据流（事件公共维度打宽数据）
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                // 设置订阅的目标主题
                .setTopics("mall-evts-comdim-w")
                .setGroupId("gp02")
                .setBootstrapServers("doitedu:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("auto.offset.commit", "true")
                .build();

        DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk-source");


        // 这里写datastream代码，主要就是为了去请求外部接口，获取搜索词的标准近义词
        SingleOutputStreamOperator<SearchBean> filled = streamSource.map(value -> JSON.parseObject(value, SearchBean.class))
                // 过滤出所有的搜索相关事件
                .filter(bean -> bean.getEvent_id().equals("search") || bean.getEvent_id().equals("search_return") || bean.getEvent_id().equals("search_click"))
                .keyBy(new KeySelector<SearchBean, Tuple2<Integer,String>>() {
                    @Override
                    public Tuple2<Integer, String> getKey(SearchBean bean) throws Exception {
                        return Tuple2.of(bean.getUser_id(), bean.getProperties().get("search_id"));
                    }
                })
                .process(new KeyedProcessFunction<Tuple2<Integer, String>, SearchBean, SearchBean>() {
                    CloseableHttpClient client;
                    HttpPost post;
                    Map<String, String> keywordEntity = new HashMap<>();

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        // 构造http客户端
                        client = HttpClientBuilder.create().build();
                        // 构造post请求
                        post = new HttpPost("http://doitedu:8081/api/post/simwords");
                        // 设置请求头参数
                        post.addHeader("Content-type", "application/json; charset=utf-8");
                        post.setHeader("Accept", "application/json");

                    }

                    @Override
                    public void processElement(SearchBean bean, KeyedProcessFunction<Tuple2<Integer, String>, SearchBean, SearchBean>.Context ctx, Collector<SearchBean> out) throws Exception {

                        bean.setSearch_id(bean.getProperties().get("search_id"));

                        if (bean.getEvent_id().equals("search")) {
                            // 请求体参数
                            String keyword = bean.getProperties().get("keyword");
                            keywordEntity.put("origin", keyword);
                            post.setEntity(new StringEntity(JSON.toJSONString(keywordEntity), StandardCharsets.UTF_8));

                            // 执行请求
                            CloseableHttpResponse response = client.execute(post);

                            // 获取结果
                            HttpEntity responseEntity = response.getEntity();
                            JSONObject jsonObject = JSON.parseObject(EntityUtils.toString(responseEntity, "utf-8"));
                            String similarWord = jsonObject.getString("similarWord");
                            String splitWords = jsonObject.getString("words");

                            // 设置分词，同义词
                            bean.setKeyword(keyword);
                            bean.setSplit_words(splitWords);
                            bean.setSimilar_word(similarWord);

                        } else if (bean.getEvent_id().equals("search_return")) {
                            // 设置 返回结果条数
                            bean.setReturn_item_count(Integer.parseInt(bean.getProperties().get("res_cnt")));
                        } else {
                            // 设置点击项序号和点击项来源模式
                            bean.setClick_item_attr(bean.getProperties().get("item_att"));
                            bean.setClick_item_seqno(Integer.parseInt(bean.getProperties().get("item_seq")));
                        }

                        out.collect(bean);
                    }
                });

        tEnv.createTemporaryView("filled",filled);

        // 创建doris连接器sink表
        tEnv.executeSql(
                " create table search_ana_base_dorissink(    "
                        +"     dt                DATE,         "
                        +"     user_id           INT,          "
                        +"     event_id          VARCHAR(20),  "
                        +"     event_time        BIGINT,       "
                        +"     search_id         VARCHAR(16),  "
                        +"     keyword           VARCHAR(40),  "
                        +"     split_words       VARCHAR(40),  "
                        +"     similar_word      VARCHAR(40),  "
                        +"     return_item_count INT,          "
                        +"     click_item_seqno  INT,          "
                        +"     click_item_attr  VARCHAR(10)    "
                        + " ) WITH (                               "
                        + "    'connector' = 'doris',              "
                        + "    'fenodes' = 'doitedu:8030',         "
                        + "    'table.identifier' = 'dwd.search_ana_base',  "
                        + "    'username' = 'root',                "
                        + "    'password' = '',                    "
                        + "    'sink.label-prefix' = 'doris_tl" + System.currentTimeMillis() + "')"
        );

        //tEnv.executeSql("select * from filled").print();

        // 插入数据
        tEnv.executeSql("INSERT INTO search_ana_base_dorissink   " +
                "SELECT to_date(date_format(to_timestamp_ltz(event_time,3),'yyyy-MM-dd')) as dt,user_id," +
                "event_id,event_time,search_id,keyword,split_words,similar_word,return_item_count,click_item_seqno,click_item_attr  " +
                "from filled");

        env.execute();

    }


    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class SearchBean {
        Integer user_id;
        String event_id;
        Long event_time;
        String search_id;
        String keyword;
        String split_words;
        String similar_word;
        Integer return_item_count;
        Integer click_item_seqno;
        String click_item_attr;  //点击项的来源属性：广告插入结果，自然搜索结果
        Map<String, String> properties;
    }

}
